线程池与协程池的原理与实现【c++与gloang】【万字分析】

您所在的位置:网站首页 协程池 go 线程池与协程池的原理与实现【c++与gloang】【万字分析】

线程池与协程池的原理与实现【c++与gloang】【万字分析】

2023-12-17 07:40| 来源: 网络整理| 查看: 265

文章目录 前言一、线程池基础知识1、线程池概念2、线程池组成 二、任务队列的实现1、定义任务结构体以及任务队列2、实现类函数 三、线程池的实现1、类的定义2、构造与析构函数2、worker的实现3、manager的实现4、添加任务以及其他添加任务以及获取正在工作的、存活的线程数目线程退出操作 四、线程池测试五、协程池1、任务结构体2、协程池3、运行 线程池完整代码

前言

C++ 线程池是一种多线程编程实现方式,它可以将多个任务并发执行,提高程序的运行效率。具体而言,线程池维护一个线程集合,在程序运行时创建一定数量的线程,并将任务插入到队列中,等待空闲线程来执行。 Go语言中的线程池通常指的是协程池,它是一种通过重复利用大量轻量级协程来节省系统开销和提高并发性能的技术。协程轻量、创建和销毁成本低。但是,如果简单地在程序中创建大量的协程,会导致系统开销较大,降低性能。因此,使用协程池管理和控制协程数量是一种有效的解决方案

下面本人使用c++代码实现线程池以及使用glang实现协程池,并且进行总结。

一、线程池基础知识 1、线程池概念

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

2、线程池组成

线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:

任务队列:存储需要处理的任务,由工作的线程来处理这些任务 通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除 已处理的任务会被从任务队列中删除 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程 工作的线程(任务队列任务的消费者) ,N个 线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理 工作的线程相当于是任务队列的消费者角色, 如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞) 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作 管理者线程(不处理任务队列中的任务),1个 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测 当任务过多的时候,可以适当的创建一些新的工作线程 当任务过少的时候,可以适当的销毁一些工作的线程

在这里插入图片描述

二、任务队列的实现 1、定义任务结构体以及任务队列

其中 Task 是任务类,里边有两个成员,分别是两个指针 void()(void) 和 void*

另外一个类 TaskQueue 是任务队列,提供了添加任务、取出任务、存储任务、获取任务个数、线程同步的功能。 addTask 函数用来向任务队列中添加任务,可以接受一个已经构造好的任务对象或者直接传入一个回调函数和它的参数来创建任务对象并添加到队列中。takeTask 函数用来从任务队列中取出一个任务,并返回对应的任务对象。taskNumber 函数则用来获取当前任务队列中的任务数量,通过内联函数来提高函数调用效率。

在进行对任务队列的修改操作时,需要先获取 m_mutex 互斥锁,以保证线程安全性。一个线程获取到锁时,其他线程将会阻塞等待,直到获取到锁的线程释放了该锁

using callback = void (*)(void*); struct Task { Task() { function = nullptr; arg = nullptr; } // 初始化 任务 Task(callback f, void* arg) { function = f; this->arg = arg; } callback function; void* arg; }; class TaskQueue { private: pthread_mutex_t m_mutex; // 互斥锁 std::queue m_queue; // 任务队列 public: TaskQueue(); ~TaskQueue(); // 添加任务 void addTask(Task& task); void addTask(callback func, void* arg); // 取出一个任务 Task takeTask(); // 获取当前队列中任务个数 定义为内联函数 提高效率 代码块的替换 inline int taskNumber() { return m_queue.size(); } }; 2、实现类函数

在默认构造函数中,调用 pthread_mutex_init 函数初始化了互斥锁 m_mutex。在析构函数中,调用 pthread_mutex_destroy 函数销毁互斥锁 m_mutex。这样可以保证资源的正确释放,避免内存泄露。

addTask 函数用来向任务队列中添加任务。首先需要获取互斥锁 m_mutex,确保线程安全。然后将任务对象或回调函数和参数封装成任务对象,并将任务对象压入任务队列中。最后释放互斥锁。

takeTask 函数用于从任务队列中取出一个任务。首先获取互斥锁 m_mutex,确保线程安全。如果队列不为空,则取出队首元素作为返回值,并将其从队列中删除;否则返回一个空的任务对象。最终释放互斥锁。

TaskQueue::TaskQueue() { pthread_mutex_init(&m_mutex, NULL); } TaskQueue::~TaskQueue() { pthread_mutex_destroy(&m_mutex); } void TaskQueue::addTask(Task& task) { pthread_mutex_lock(&m_mutex); m_queue.push(task); pthread_mutex_unlock(&m_mutex); } void TaskQueue::addTask(callback func, void* arg) { pthread_mutex_lock(&m_mutex); Task task; task.function = func; task.arg = arg; m_queue.push(task); pthread_mutex_unlock(&m_mutex); } Task TaskQueue::takeTask() { Task t; pthread_mutex_lock(&m_mutex); if (m_queue.size() > 0) { t = m_queue.front(); m_queue.pop(); } pthread_mutex_unlock(&m_mutex); return t; } 三、线程池的实现 1、类的定义

m_taskQ: 任务队列指针,用于存放待执行的任务。 m_lock: 线程池锁,用于保护线程池的操作不被多个线程同时访问和修改。 m_notEmpty: 条件变量,用于当任务队列为空时等待新任务的到来。 m_threadIDs: 线程 ID 数组,保存所有工作线程的 ID。 m_managerID: 管理者线程 ID,用于管理线程池中的工作线程。 m_minNum: 最小线程数,线程池初始大小,至少拥有 m_minNum 个线程。 m_maxNum: 最大线程数,线程池允许的最大线程数,不能超过此数目。 m_busyNum: 正在执行任务的工作线程数。 m_aliveNum: 存活状态的线程数(即没有被销毁的线程数)。 m_exitNum: 要销毁的线程数。 m_shutdown: 线程池是否关闭,如果为 true,线程池将不再接收新任务。 主要成员函数有:

ThreadPool(int min, int max): 构造函数,创建一个线程池对象。 void addTask(Task task): 向任务队列中添加一个新的任务。 int getBusyNumber(): 返回正在执行任务的工作线程数目。 int getAliveNumber(): 返回池中存活线程数量。 ~ThreadPool(): 析构函数,销毁线程池对象。

class ThreadPool { private: TaskQueue* m_taskQ; pthread_mutex_t m_lock; // 锁整个的线程池 pthread_cond_t m_notEmpty; // 任务队列是不是空了 pthread_t* m_threadIDs; // 保存线程id的数组 会把线程id保存在这个数组中 pthread_t m_managerID; // 管理者线程ID int m_minNum; // 最小线程数量 int m_maxNum; // 最大线程数量 int m_busyNum; // 忙的线程的个数 int m_aliveNum; // 存活的线程的个数 int m_exitNum; // 要销毁的线程个数 bool m_shutdown = false; public: // 传入 工作线程最大以及最小值 ThreadPool(int min, int max); // 添加任务 void addTask(Task task); // 获取忙线程的个数 int getBusyNumber(); // 获取活着的线程个数 int getAliveNumber(); ~ThreadPool(); private: // 工作的线程的任务函数 static void* worker(void* arg); // 管理者线程的任务函数 static void* manager(void* arg); void threadExit(); }; 2、构造与析构函数

实例化一个任务队列 m_taskQ,并根据传入的参数 minNum 和 maxNum 初始化了线程池的各项属性。然后,根据最小线程数 minNum 创建相应数量的线程,并将他们分配到线程数组 m_threadIDs 中。最后,创建管理者线程 m_managerID。其中,线程的回调函数为 worker,管理者线程的回调函数为 manager

ThreadPool::ThreadPool(int minNum, int maxNum) { // 实例化任务队列 // 实例化任务队列 m_taskQ = new TaskQueue; do { // 初始化线程池 m_minNum = minNum; m_maxNum = maxNum; m_busyNum = 0; m_aliveNum = minNum; // 根据线程的最大上限给线程数组分配内存 m_threadIDs = new pthread_t[maxNum]; if (m_threadIDs == nullptr) { cout pthread_create(&m_threadIDs[i], NULL, worker, this); cout pthread_cond_signal(&m_notEmpty); } if (m_taskQ) delete m_taskQ; if (m_threadIDs) delete[] m_threadIDs; pthread_mutex_destroy(&m_lock); pthread_cond_destroy(&m_notEmpty); } 2、worker的实现

首先获取互斥锁 pthread_mutex_lock(),以便访问共享资源。 然后判断任务队列是否为空,如果为空则阻塞线程,并等待条件变量 m_notEmpty 的通知。这里使用 while 循环判断条件队列是否为空,是为了防止虚假唤醒。 在等待期间,如果收到销毁线程的通知,则执行相应的线程销毁操作。 如果任务队列不为空,则从队列中取出一个任务并将忙碌线程数量加一。 执行任务并将忙碌线程数量减一。 最后释放互斥锁,以便其他线程可以访问共享资源

// 工作的线程的任务函数 void* ThreadPool::worker(void* arg) { // 强制类型转换 传进来的是 pool的 this ThreadPool* pool = static_cast(arg); // 一直不停的工作 while (true) { // 访问任务队列(共享资源)加锁 pthread_mutex_lock(&pool->m_lock); // 判断任务队列是否为空, 如果为空工作线程阻塞 while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown) { cout pool->m_aliveNum--; pthread_mutex_unlock(&pool->m_lock); pool->threadExit(); } } } // 判断线程池是否被关闭了 if (pool->m_shutdown) { pthread_mutex_unlock(&pool->m_lock); pool->threadExit(); } // 从任务队列中取出一个任务 Task task = pool->m_taskQ->takeTask(); // 工作的线程+1 pool->m_busyNum++; // 线程池解锁 pthread_mutex_unlock(&pool->m_lock); // 执行任务 cout // 每隔5s检测一次 sleep(5); // 取出线程池中的任务数和线程数量 // 取出工作的线程池数量 pthread_mutex_lock(&pool->m_lock); int queueSize = pool->m_taskQ->taskNumber(); int liveNum = pool->m_aliveNum; int busyNum = pool->m_busyNum; pthread_mutex_unlock(&pool->m_lock); // 添加或者销户线程 // 规则::任务的个数>存活的线程个数 && 存活的线程数存活的线程数 && 存活的线程数 liveNum && liveNum m_maxNum) { // 线程池加锁 pthread_mutex_lock(&pool->m_lock); int num = 0; for (int i = 0; i m_maxNum && num m_aliveNum m_maxNum; ++i) { if (pool->m_threadIDs[i] == 0) { pthread_create(&pool->m_threadIDs[i], NULL, worker, pool); num++; pool->m_aliveNum++; } } pthread_mutex_unlock(&pool->m_lock); } // 销毁多余的线程 // 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量 if (busyNum * 2 pool->m_minNum) { pthread_mutex_lock(&pool->m_lock); pool->m_exitNum = NUMBER; pthread_mutex_unlock(&pool->m_lock); // 通过信号让线程自杀 for (int i = 0; i if (m_shutdown) { return; } // 添加任务,不需要加锁,任务队列中有锁 m_taskQ->addTask(task); // 唤醒工作的线程 pthread_cond_signal(&m_notEmpty); } // 获取忙线程的个数 int ThreadPool::getBusyNumber() { int threadNum = 0; pthread_mutex_lock(&m_lock); threadNum = m_aliveNum; pthread_mutex_unlock(&m_lock); return threadNum; } // 获取活着的线程个数 int ThreadPool::getAliveNumber() { int busyNum = 0; pthread_mutex_lock(&m_lock); busyNum = m_busyNum; pthread_mutex_unlock(&m_lock); return busyNum; } 线程退出操作

1、获取当前线程的线程 ID pthread_t tid = pthread_self()。 2、遍历线程池中的线程 ID 数组 m_threadIDs,查找当前线程的线程 ID 是否在数组中。如果找到了,则将对应位置的线程 ID 置为 0,并打印输出退出信息。 3、最后,通过调用 pthread_exit(NULL) 函数退出当前线程。

void ThreadPool::threadExit() { // 获取当前线程的线程id pthread_t tid = pthread_self(); for (int i = 0; i // 如果等于当前线程id的时候 直接等于0 就行了 cout ThreadPool pool(5, 10); for (int i = 0; i f func() error //一个无参的函数类型 } // 创建新的任务 func NewTask(f func()error )*Task{ task:=Task{ f:f, } return &task } //执行Task任务的方法 func (t *Task) Execute() { t.f() //调用任务所绑定的函数 } 2、协程池

EntryChannel是对外接收Task任务的入口。在main()函数中,我们创建了一个Task,并将其交给协程池p的EntryChannel处理。EntryChannel起到了暴露接口的作用,就像是一个物流公司的门店,客户可以将订单送进门店,并让物流公司负责运输。 JobsChannel则是协程池内部的任务队列。所有被EntryChannel接收到的Task都会先放入JobsChannel。Worker协程会从JobsChannel中取任务并执行。这样做的好处是可以实现任务异步处理,提高程序执行效率,类似于一条流水线。

在Run方法中,首先会根据worker数量启动对应数量的goroutine来执行任务。然后通过for-range循环从EntryChannel中读取任务,并将其放入JobsChannel中去。之后每个工作线程会从JobsChannel中读取任务并执行该任务对应的函数。

同时,在Run方法执行完毕后,需要关闭JobsChannel以保证所有任务都已经执行完成。而关闭EntryChannel则是为了告知协程池已经没有新的任务需要添加进来了。

type Pool struct { //对外接收Task的入口 EntryChannel chan *Task //协程池最大worker数量,限定Goroutine的个数 worker_num int //协程池内部的任务就绪队列 JobsChannel chan *Task } //创建一个协程池 func NewPool(cap int) *Pool { p := Pool{ EntryChannel: make(chan *Task), worker_num: cap, JobsChannel: make(chan *Task), } return &p } //协程池创建一个worker并且开始工作 func (p *Pool) worker(work_ID int) { //worker不断的从JobsChannel内部任务队列中拿任务 for task := range p.JobsChannel { //如果拿到任务,则执行task任务 task.Execute() fmt.Println("worker ID ", work_ID, " 执行完毕任务") } } //让协程池Pool开始工作 func (p *Pool) Run() { //1,首先根据协程池的worker数量限定,开启固定数量的Worker, // 每一个Worker用一个Goroutine承载 for i := 0; i p.JobsChannel fmt.Println(time.Now()) return nil }) //创建一个协程池,最大开启3个协程worker p := NewPool(3) //开一个协程 不断的向 Pool 输送打印一条时间的task任务 //启动协程池p go func() { for j:=0;j Task() { function = nullptr; arg = nullptr; } // 初始化 任务 Task(callback f, void* arg) { function = f; this->arg = arg; } callback function; void* arg; }; class TaskQueue { private: pthread_mutex_t m_mutex; // 互斥锁 std::queue m_queue; // 任务队列 public: TaskQueue(); ~TaskQueue(); // 添加任务 void addTask(Task& task); void addTask(callback func, void* arg); // 取出一个任务 Task takeTask(); // 获取当前队列中任务个数 定义为内联函数 提高效率 代码块的替换 inline int taskNumber() { return m_queue.size(); } }; class ThreadPool { private: TaskQueue* m_taskQ; pthread_mutex_t m_lock; // 锁整个的线程池 pthread_cond_t m_notEmpty; // 任务队列是不是空了 pthread_t* m_threadIDs; // 保存线程id的数组 会把线程id保存在这个数组中 pthread_t m_managerID; // 管理者线程ID int m_minNum; // 最小线程数量 int m_maxNum; // 最大线程数量 int m_busyNum; // 忙的线程的个数 int m_aliveNum; // 存活的线程的个数 int m_exitNum; // 要销毁的线程个数 bool m_shutdown = false; public: // 传入 工作线程最大以及最小值 ThreadPool(int min, int max); // 添加任务 void addTask(Task task); // 获取忙线程的个数 int getBusyNumber(); // 获取活着的线程个数 int getAliveNumber(); ~ThreadPool(); private: // 工作的线程的任务函数 static void* worker(void* arg); // 管理者线程的任务函数 static void* manager(void* arg); void threadExit(); };

源文件

#include "ThreadPool.h" TaskQueue::TaskQueue() { pthread_mutex_init(&m_mutex, NULL); } TaskQueue::~TaskQueue() { pthread_mutex_destroy(&m_mutex); } void TaskQueue::addTask(Task& task) { pthread_mutex_lock(&m_mutex); m_queue.push(task); pthread_mutex_unlock(&m_mutex); } void TaskQueue::addTask(callback func, void* arg) { pthread_mutex_lock(&m_mutex); Task task; task.function = func; task.arg = arg; m_queue.push(task); pthread_mutex_unlock(&m_mutex); } Task TaskQueue::takeTask() { Task t; pthread_mutex_lock(&m_mutex); if (m_queue.size() > 0) { t = m_queue.front(); m_queue.pop(); } pthread_mutex_unlock(&m_mutex); return t; } ThreadPool::ThreadPool(int minNum, int maxNum) { // 实例化任务队列 m_taskQ = new TaskQueue; // 初始化线程池 m_minNum = minNum; m_maxNum = maxNum; m_busyNum = 0; m_aliveNum = minNum; // 根据线程的最大上限给线程数组分配内存 m_threadIDs = new pthread_t[maxNum]; if (m_threadIDs == nullptr) { delete m_taskQ; cout pthread_create(&m_threadIDs[i], NULL, worker, this); cout pthread_cond_signal(&m_notEmpty); } if (m_taskQ) delete m_taskQ; if (m_threadIDs) delete[] m_threadIDs; pthread_mutex_destroy(&m_lock); pthread_cond_destroy(&m_notEmpty); } // 添加任务 void ThreadPool::addTask(Task task) { if (m_shutdown) { return; } // 添加任务,不需要加锁,任务队列中有锁 m_taskQ->addTask(task); // 唤醒工作的线程 pthread_cond_signal(&m_notEmpty); } // 获取忙线程的个数 int ThreadPool::getBusyNumber() { int threadNum = 0; pthread_mutex_lock(&m_lock); threadNum = m_aliveNum; pthread_mutex_unlock(&m_lock); return threadNum; } // 获取活着的线程个数 int ThreadPool::getAliveNumber() { int busyNum = 0; pthread_mutex_lock(&m_lock); busyNum = m_busyNum; pthread_mutex_unlock(&m_lock); return busyNum; } // 工作的线程的任务函数 void* ThreadPool::worker(void* arg) { // 强制类型转换 传进来的是 pool的 this ThreadPool* pool = static_cast(arg); // 一直不停的工作 while (true) { // 访问任务队列(共享资源)加锁 pthread_mutex_lock(&pool->m_lock); // 判断任务队列是否为空, 如果为空工作线程阻塞 while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown) { // cout m_exitNum > 0) { pool->m_exitNum--; if (pool->m_aliveNum > pool->m_minNum) { pool->m_aliveNum--; pthread_mutex_unlock(&pool->m_lock); pool->threadExit(); } } } // 判断线程池是否被关闭了 if (pool->m_shutdown) { pthread_mutex_unlock(&pool->m_lock); pool->threadExit(); } // 从任务队列中取出一个任务 Task task = pool->m_taskQ->takeTask(); // 工作的线程+1 pool->m_busyNum++; // 线程池解锁 pthread_mutex_unlock(&pool->m_lock); // 执行任务 // cout // 每隔5s检测一次 sleep(5); // 取出线程池中的任务数和线程数量 // 取出工作的线程池数量 pthread_mutex_lock(&pool->m_lock); int queueSize = pool->m_taskQ->taskNumber(); int liveNum = pool->m_aliveNum; int busyNum = pool->m_busyNum; pthread_mutex_unlock(&pool->m_lock); // 添加或者销户线程 // 规则::任务的个数>存活的线程个数 && 存活的线程数存活的线程数 && 存活的线程数 liveNum && liveNum m_maxNum) { // 线程池加锁 pthread_mutex_lock(&pool->m_lock); int num = 0; for (int i = 0; i m_maxNum && num m_aliveNum m_maxNum; ++i) { if (pool->m_threadIDs[i] == 0) { pthread_create(&pool->m_threadIDs[i], NULL, worker, pool); num++; pool->m_aliveNum++; } } pthread_mutex_unlock(&pool->m_lock); } // 销毁多余的线程 // 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量 if (busyNum * 2 pool->m_minNum) { pthread_mutex_lock(&pool->m_lock); pool->m_exitNum = NUMBER; pthread_mutex_unlock(&pool->m_lock); // 通过信号让线程自杀 for (int i = 0; i // 获取当前线程的线程id pthread_t tid = pthread_self(); for (int i = 0; i // 如果等于当前线程id的时候 直接等于0 就行了 // cout


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3